3. HDSF

HDFS 优缺点

优点

  1. 高容错性,数据自动保存多个副本,某个副本丢失之后,可以自动恢复。
  2. 适合处理大数据,能处理规模达到 GB,TB 甚至 PB 以上级别的数据
  3. 构建在廉价机器上

缺点

  1. 不适合低延迟数据访问
  2. 不适合存储大量的小文件。HDFS存储了大量的小文件,会降低NN的服务能力,NN负责文件元数据(属性,块的映射) 的管理,NN在运行时,必须将当前集群中存储所有文件的元数据全部加载到内存,而 NN 的内存是有限的。
  3. 不支持文件的并发写入,文件的随机修改。一个文件同一时刻只能由一个客户端进行写入,不允许多个线程同时写,而且仅支持对数据的追加,不支持随机修改。

组成架构

NameNode:

  1. 管理 HDFS 的名称空间
  2. 配置副本策略
  3. 管理数据块映射信息
  4. 处理客户端读写请求

DataNode

  1. 存储实际的数据块
  2. 执行数据块的读写操作

Client

  1. 文件切分。文件上传 HDFS 的时候, Client 将文件切分成一个一个的 Block, 然后进行上传
  2. 与 NameNode 交互,获取文件的位置信息
  3. 与 DataNode 交互, 读取或者写入数据
  4. Client 提供一些命令来管理 HDFS, 比如 NameNode 格式化
  5. Client 可以通过一些命令来访问 HDFS, 比如对 HDFS 增删查改操作

Secondary NameNode

  1. 并非 NameNode 的热备。当 NameNode 挂掉 的时候,它并不能马上替换 NameNode 并提供服务
  2. 可以辅助 NameNode,分担其工作量,比如定期合并 Fsimage 和 Edits,并推送给 NameNode
  3. 在紧急情况下,可辅助恢复 NameNode

块大小

HDFS中的文件在物理,上是分块存储(Block) ,块的大小可以通过配置参数(dfs.blocksize)来规定,默认大小在Hadoop2.x 版本中是128M(每个块最多存储128M的数据,如果当前块存储的数据不满128M存了多少数据,就占用多少的磁盘空间,一个块只属于一个文件),老版本中是64M。

为什么块大小不能设置太小,也不能设置太大。决定因素是什么?

  1. HDFS 的块设置太小,会增加寻址时间,程序一直在找块的开始位置。
  2. 如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。导致程序在处理这块数据时,会非常慢。

总结: HDFS块的大小设置主要取决于磁盘传输速率。(如果寻址时间约为10ms, 即查找到目标block的时间为10ms。寻址时间为传输时间的1%时, 则为最佳状态。因此,传输时间=10ms/0.01=1000ms=1s,而目前磁盘的传输速率普遍为100MB/s。所以块的大小取 2 的 n次方最接近 100 的值,即 128M)

常用命令

  • help:输出这个命令参数
1
2
3
4
5
6
7
8
9
10
11
[rexyan@hadoop10 ~]$ hadoop fs -help put
-put [-f] [-p] [-l] <localsrc> ... <dst> :
Copy files from the local file system into fs. Copying fails if the file already
exists, unless the -f flag is given.
Flags:

-p Preserves access and modification times, ownership and the mode.
-f Overwrites the destination if it already exists.
-l Allow DataNode to lazily persist the file to disk. Forces
replication factor of 1. This flag will result in reduced
durability. Use with care.
  • ls: 显示目录信息
1
2
3
4
5
6
7
8
[rexyan@hadoop10 ~]$ hadoop fs -ls /
Found 6 items
drwx------ - rexyan supergroup 0 2020-07-12 19:47 /tmp
drwxr-xr-x - rexyan supergroup 0 2020-07-12 14:48 /wcinput
drwxr-xr-x - rexyan supergroup 0 2020-07-12 14:53 /wcinput2
drwxr-xr-x - rexyan supergroup 0 2020-07-12 18:33 /wcinput3
drwxr-xr-x - rexyan supergroup 0 2020-07-12 18:36 /wcoutput3
drwxr-xr-x - rexyan supergroup 0 2020-07-12 19:47 /wcoutput4
  • mkdir:在HDFS上创建目录
1
2
3
4
5
6
7
8
9
10
[rexyan@hadoop10 ~]$ hadoop fs -mkdir /test_command
[rexyan@hadoop10 ~]$ hadoop fs -ls /
Found 7 items
drwxr-xr-x - rexyan supergroup 0 2020-07-22 23:44 /test_command
drwx------ - rexyan supergroup 0 2020-07-12 19:47 /tmp
drwxr-xr-x - rexyan supergroup 0 2020-07-12 14:48 /wcinput
drwxr-xr-x - rexyan supergroup 0 2020-07-12 14:53 /wcinput2
drwxr-xr-x - rexyan supergroup 0 2020-07-12 18:33 /wcinput3
drwxr-xr-x - rexyan supergroup 0 2020-07-12 18:36 /wcoutput3
drwxr-xr-x - rexyan supergroup 0 2020-07-12 19:47 /wcoutput4
  • moveFromLocal:从本地剪切文件粘贴到HDFS
1
2
[rexyan@hadoop10 ~]$ echo "hadoop test command" >> command.txt  # 创建文件 command.txt
[rexyan@hadoop10 ~]$ hadoop fs -moveFromLocal command.txt /test_command # 将文件剪切到 hdfs 中
  • copyFromLocal:从本地文件系统中拷贝文件到HDFS路径去
1
2
[rexyan@hadoop10 ~]$ echo "hadoop test command" >> command2.txt 
[rexyan@hadoop10 ~]$ hadoop fs -copyFromLocal command2.txt /test_command
  • put:等同于copyFromLocal
1
2
[rexyan@hadoop10 ~]$ echo "hadoop test command" >> command3.txt  
[rexyan@hadoop10 ~]$ hadoop fs -put command3.txt /test_command
  • copyToLocal:从HDFS拷贝到本地
1
[rexyan@hadoop10 ~]$ hadoop fs -copyToLocal /test_command/command.txt /tmp/  # 将文件从 HDFS 上拷贝到本地 /tmp 目录中
  • get:等同于copyToLocal,就是从HDFS下载文件到本地
1
[rexyan@hadoop10 ~]$ hadoop fs -get /test_command/command2.txt /tmp/
  • appendToFile:追加一个文件到已经存在的文件末尾
1
2
3
4
5
6
7
8
[rexyan@hadoop10 ~]$ hadoop fs -cat /test_command/command.txt # 先查看文件内容
hadoop test command
[rexyan@hadoop10 ~]$ echo "append content" >> append.txt # 创建一个文件
[rexyan@hadoop10 ~]$ hadoop fs -appendToFile append.txt /test_command/command.txt # 追加文件内容
[rexyan@hadoop10 ~]$ hadoop fs -cat /test_command/command.txt # 查询追加的文件的内容
hadoop test command
append content
[rexyan@hadoop10 ~]$
  • cat:显示文件内容
1
2
3
[rexyan@hadoop10 ~]$ hadoop fs -cat /test_command/command.txt # 查看文件的内容
hadoop test command
append content
  • chgrp 、-chmod、-chown:Linux文件系统中的用法一样,修改文件所属权限。
1
2
[rexyan@hadoop10 ~]$ hadoop fs -chmod 666 /test_command/command.txt  # 修改权限
[rexyan@hadoop10 ~]$ hadoop fs -chown zhangsan:zhangsan /test_command/command.txt # 修改所属人和组
  • cp :从HDFS的一个路径拷贝到HDFS的另一个路径
1
2
3
4
5
[rexyan@hadoop10 ~]$ hadoop fs -cp /test_command/command.txt /wcinput  # 拷贝文件
[rexyan@hadoop10 ~]$ hadoop fs -ls /wcinput # 查看结果
Found 1 items
-rw-r--r-- 3 rexyan supergroup 35 2020-07-23 00:01 /wcinput/command.txt
[rexyan@hadoop10 ~]$
  • mv:在HDFS目录中移动文件
1
rexyan@hadoop10 ~]$ hadoop fs -mv /test_command/command3.txt /wcinput
  • tail:显示一个文件的末尾
1
2
3
4
[rexyan@hadoop10 ~]$ hadoop fs -tail  /wcinput/command.txt   
hadoop test command
append content
[rexyan@hadoop10 ~]$
  • rm:删除文件或文件夹
1
[rexyan@hadoop10 ~]$ hadoop fs -rm  /wcinput/command.txt
  • rmdir:删除空目录
1
[rexyan@hadoop10 ~]$ hadoop fs -rmdir /wcinput2
  • du统计文件夹的大小信息
1
2
3
4
5
6
[rexyan@hadoop10 ~]$ hadoop fs -du /test_command   # 查看 test_command 目录下每个文件的大小
35 /test_command/command.txt
20 /test_command/command2.txt
[rexyan@hadoop10 ~]$ hadoop fs -du -s /test_command # 查看 test_command 目录的大小
55 /test_command
[rexyan@hadoop10 ~]$
  • setrep:设置HDFS中文件的副本数量。这里设置的副本数只是记录在NameNode的元数据中,是否真的会有这么多副本,还得看DataNode的数量。因为目前只有3台设备,最多也就3个副本,只有节点数的增加到10台时,副本数才能达到10。
1
2
[rexyan@hadoop10 ~]$ hadoop fs -setrep 2 /test_command/command.txt
Replication 2 set: /test_command/command.txt

Java Client

创建目录 (使用代码配置)

参数优先级排序:客户端代码中设置的值 > ClassPath下的用户自定义配置文件 > 然后是服务器的默认配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.yanrs.hdfs;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HDFSClient {
@Test
public void testMkdirs() throws IOException, URISyntaxException, InterruptedException {
// 1 获取文件系统
Configuration configuration = new Configuration();

// 配置在集群上运行 方式1
// configuration.set("fs.defaultFS", "hdfs://hadoop102:9000");
// FileSystem fs = FileSystem.get(configuration);

// 配置在集群上运行 方式2
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");

// 2 创建目录
fs.mkdirs(new Path("/test_java_client"));
// 3 关闭资源
fs.close();
}
}

创建目录 (使用配置文件配置)

参数优先级排序:客户端代码中设置的值 > ClassPath下的用户自定义配置文件 > 然后是服务器的默认配置

创建 hdfs-site.xml 配置文件,内容如下:

1
2
3
4
5
6
7
8
9
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

测试上传文件,上传后看副本数量是否是配置文件中设置的1个

1
2
3
4
5
6
7
@Test
public void testMkdirConfigForXml() throws IOException, URISyntaxException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");
fs.copyFromLocalFile(false, true, new Path("/tmp/testFile"), new Path("/test_java_client"));
fs.close();
}

查看效果

重命名

1
2
3
4
5
6
7
@Test
public void testRename() throws IOException, URISyntaxException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");
fs.rename(new Path("/test_java_client/testFile"), new Path("/test_java_client/newTestFile"));
fs.close();
}

查看文件详情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Test
public void testFileInfo() throws IOException, URISyntaxException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");

// 递归获取 / 目录下的文件信息
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()){
LocatedFileStatus status = listFiles.next();

System.out.println("-----------" + status.getPath().getName() + "-----------" );
System.out.println(status.getAccessTime());
System.out.println(status.getBlockSize());
System.out.println(status.getModificationTime());
System.out.println(status.getGroup());
System.out.println(status.getOwner());
System.out.println(status.getLen());
System.out.println(status.getPermission());
System.out.println(status.getReplication());

// 获取文件块信息
BlockLocation[] blockLocations = status.getBlockLocations();
for (BlockLocation blockLocation:blockLocations) {
// 块名称
String[] names = blockLocation.getNames();
for (String name:names) {
System.out.println(name);
}
// 块所在机器IP
String[] hosts = blockLocation.getHosts();
for (String host:hosts) {
System.out.println(host);
}
// 块大小
System.out.println(blockLocation.getLength());
// 块偏移量
System.out.println(blockLocation.getOffset());
}
}
// 3 关闭资源
fs.close();
}

判断是文件还是文件夹

1
2
3
4
5
6
7
@Test
public void testIsFile() throws IOException, URISyntaxException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");
System.out.println(fs.isDirectory(new Path("/test_java_client/newTestFile")));
System.out.println(fs.isFile(new Path("/test_java_client/newTestFile")));
}

文件下载

1
2
3
4
5
6
@Test
public void testDownFile() throws IOException, URISyntaxException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");
fs.copyToLocalFile(new Path("/test_java_client/newTestFile"), new Path("/tmp/newTestFile"));
}

文件夹删除

1
2
3
4
5
6
@Test
public void testRemoveFile() throws IOException, URISyntaxException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");
fs.delete(new Path("/test_java_client/newTestFile"), true);
}

自定义上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testCustomizeUpload() throws IOException, URISyntaxException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");

// 创建输入流
FileInputStream fileInputStream = new FileInputStream(new File("/tmp/testFile"));
// 创建输出流
FSDataOutputStream fsDataOutputStream = fs.create(new Path("/test_java_client/uploadFile"));
// 流的拷贝
IOUtils.copyBytes(fileInputStream, fsDataOutputStream, 1024);

// 关闭资源
IOUtils.closeStream(fsDataOutputStream);
IOUtils.closeStream(fileInputStream);
fs.close();
}

自定义下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
public void testCustomizeDown() throws IOException, URISyntaxException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");
// 获取输入流
FSDataInputStream dataInputStream = fs.open(new Path("/test_java_client/uploadFile"));
// 创建输出流
FileOutputStream fileOutputStream = new FileOutputStream(new File("/tmp/downFile"));
// 流的拷贝
IOUtils.copyBytes(dataInputStream, fileOutputStream, 1024);

// 关闭资源
IOUtils.closeStream(dataInputStream);
IOUtils.closeStream(fileOutputStream);
fs.close();
}

定位下载

先上传一个大于 128M 的文件到 HDFS 上

下载第一块文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void testFirstBlockDown() throws IOException, URISyntaxException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");
// 获取输入流
FSDataInputStream dataInputStream = fs.open(new Path("/test_java_client/bigfile"));
// 创建输出流
FileOutputStream fileOutputStream = new FileOutputStream(new File("/tmp/bigfile.part1"));

// 下载 1024 * 128(也就是 128M 的文件的大小)
byte[] bytes = new byte[1024];
for (int i = 0; i < 1024 * 128; i++) {
dataInputStream.read(bytes);
fileOutputStream.write(bytes);
}
// 关闭资源
IOUtils.closeStream(dataInputStream);
IOUtils.closeStream(fileOutputStream);
fs.close();
}

下载第二块文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testSecondBlockDown() throws IOException, URISyntaxException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop10:9000"), configuration, "rexyan");
// 获取输入流
FSDataInputStream dataInputStream = fs.open(new Path("/test_java_client/bigfile"));
// 创建输出流
FileOutputStream fileOutputStream = new FileOutputStream(new File("/tmp/bigfile.part2"));
// 定位数据文件
dataInputStream.seek(1024*1024*128);
// 拷贝下载 128M 之后的数据
IOUtils.copyBytes(dataInputStream, fileOutputStream, 1024);
// 关闭资源
IOUtils.closeStream(dataInputStream);
IOUtils.closeStream(fileOutputStream);
fs.close();
}

代码地址

Python Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from hdfs import InsecureClient

client = InsecureClient('http://hadoop10:50070', user='rexyan')


def get_file_content(file_path):
"""
读取文件内容
:param file_path:
:return:
"""
with client.read(file_path, encoding="utf-8", buffer_size=1024) as reader:
context = reader.read()
return context


def upload_file(local_path, origin_path):
"""
上传本地文件到 HDFS 中
:param local_path:
:param origin_path:
:return:
"""
with open(local_path, encoding="utf-8") as reader, client.write(origin_path, encoding="utf-8") as writer:
for content in reader:
writer.write(content)


def get_file_info(origin_path):
"""
获取文件信息
:param origin_path:
:return:
"""
# 文件/路径 信息
content = client.content(origin_path)
print(content)

# 文件夹下所有文件名称
fnames = client.list(origin_path)
print(fnames)

# 文件/路径 状态信息
status = client.status(origin_path)
print(status)

# 重命名
# client.rename('dat/features', 'features')
# 删除文件
# client.delete('dat', recursive=True)


if __name__ == '__main__':
# print(get_file_content("/test_java_client/uploadFile"))
# upload_file("/tmp/testFile", "/test_python_client/uploadFile")
get_file_info("/")

更多信息可参考 此处

代码地址

HDFS的写数据流程

正常写流程

  1. 服务端启动 HDFS 中的 NN 和 DN 进程
  2. 客户端创建一个分布式文件系统客户端,由客户端向 NN 发送请求,请求上传文件
  3. NN处理请求,检查客户端是否有权限上传,路径是否合法等
  4. 检查通过,NN 响应客户端可以上传
  5. 客户端根据自己设置的块大小,开始上传第一个块,默认 0-128M, NN 根据客户端上传文件的副本数(默认为3),根据机架感知策略选取指定数量的 DN节点返回
  6. 客户端根据返回的 DN节点,请求建立传输通道,客户端向最近(网络距离最近)的DN节点发起通道建立请求,由这个DN节点依次向通道中的(距离当前DN距离最近)下一个节点发送建立通道请求,各个节点发送响应 ,通道建立成功
  7. 客户端开始往第一个 DN 上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位(64K的数据),第一个 DN 收到一个 Packet 就会传给第二个 DN,第二个DN 收到后传给第三个 DN;第一个 DN 每传一个 packet 会放入一个应答队列等待应答。
  8. 一个 Block 块的数据传输完成之后,通道关闭,DN 向 NN上报消息,已经收到某个块
  9. 第一个 Block 块传输完成,第二块开始传输,依次重复⑤-⑧,直到最后一个块传输完成,NN向客户端响应传输完成!

异常写流程

  1. 1-6 步骤同上
  2. 客户端每读取64K的数据,封装为一个packet,封装成功的packet,放入到一个队列中,这个队列称为dataQuene(待发送数据包) 在发送时,先将dataQuene 中的 packet 按顺序发送,发送后再放入到 ackquene (正在发送的队列) 中。每个节点在收到 packe t后,都会向客户端发送 ack 确认消息。如果一个 packet 在发送后,已经收到了所有 DN 返回的 ack 确认消息,这个 packet 会在 ackquene 中删除,假如一个 packet 在发送后,未收到 DN 返回的 ack 确认消息,则传输中止,ackquene 中的 packet 会回滚到 dataQuene。然后重新建立通道,剔除坏的DN节点。建立完成之后,继续传输!
    只要有一个 DN 节点收到了数据,DN 上报 NN 已经接收完此块,NN就认为当前块已经传输成功,NN会自动维护副本数!

机架感知

在 HDFS 写数据的过程中,NameNode会选择距离待上传数据最近距离的 DataNode 接收数据。那么这个最近距离怎么计算呢?

Hadoop 2.7.2 副本选择

默认是三个副本,在客户端上传的时候可以选择上传的大小和副本的数量。查看 hdfs-default.xml 文件,可以看到副本数默认为 3

在 shell 段进行文件上传的时候,可以使用 -D+配置名称 来设置副本数量

1
hadoop fs -Ddfs.replication=1 -put xsync /replication_test

第一个副本在 client 所在的节点上,如果客户端在集群外,那么就随机选择一个节点

第二个副本和第一个副本要位于相同的机架,但是节点随机

第三个副本位于不同的机架,节点随机

HDFS 读数据流程

  1. 客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。
  2. 挑选一台DataNode(就近原则)服务器,请求读取数据。
  3. DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。
  4. 客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

NN 和 2NN 工作机制

Fsimage:NameNode 内存中元数据序列化后形成的文件。

Edits:记录客户端更新元数据信息的每一步操作(可通过 Edits 运算出元数据)。NameNode 启动时,先滚动Edits 并生成一个空的 edits.inprogress,然后加载 Edits 和 Fsimage 到内存中,此时 NameNode 内存就持有最新的元数据信息。Client 开始对 NameNode 发送元数据的增删改的请求,这些请求的操作首先会被记录到edits.inprogress 中(查询元数据的操作不会被记录在 Edits 中,因为查询操作不会更改元数据信息),如果此时NameNode 挂掉,重启后会从 Edits 中读取元数据的信息。然后,NameNode 会在内存中执行元数据的增删改的操作。

由于 Edits 中记录的操作会越来越多,Edits 文件会越来越大,导致 NameNode 在启动加载 Edits 时会很慢,所以需要对Edits和Fsimage进行合并(所谓合并,就是将Edits和Fsimage加载到内存中,照着Edits中的操作一步步执行,最终形成新的Fsimage)。SecondaryNameNode的作用就是帮助NameNode进行Edits和Fsimage的合并工作。SecondaryNameNode 首先会询问 NameNode 是否需要 CheckPoint(触发 CheckPoint 需要满足两个条件中的任意一个,定时时间【SecondaryNameNode每隔一小时执行一次】到和Edits中数据写满了【一分钟检查一次操作次数,当操作次数达到1百万时,SecondaryNameNode执行一次】)。SecondaryNameNode 执行 CheckPoint 操作,首先会让 NameNode 滚动 Edits 并生成一个空的 edits.inprogress,滚动 Edits 的目的是给 Edits 打个标记,以后所有新的操作都写入 edits.inprogress,其他未合并的 Edits 和 Fsimage 会拷贝到 SecondaryNameNode 的本地,然后将拷贝的 Edits 和 Fsimage 加载到内存中进行合并,生成 fsimage.chkpoint,然后将 fsimage.chkpoint 拷贝给 NameNode,重命名为 Fsimage 后替换掉原来的 Fsimage。NameNode 在启动时就只需要加载之前未合并的Edits 和 Fsimage 即可,因为合并过的 Edits 中的元数据信息已经被记录在 Fsimage 中。

CheckPoint

触发条件

拉取 /opt/module/hadoop-2.7.2/share/hadoop/hdfs/hadoop-hdfs-2.7.2.jar jar 包,解压得到 hdfs-default.xml 文件。可以看到出触发 CheckPoint 的设置

目录切换至 10 机器 /opt/module/hadoop-2.7.2/data/tmp/dfs/name/current 查看 edits 和 fsimage 信息

namenode 在启动的时候也会检查是否进行 CheckPoint,如果满足条件,那么也会进行 CheckPoint。从下图可以看出,启动 namenode 会先加载 fsimage 文件,然后加载 edits 文件,之后进行 CheckPoint 检查,满足条件就将 edits 合并到一个新的 fsimage 文件中,之后 HDFS 会进行安全模式,安全模式中只能读取部分数据,不能写数据。

手动执行

也可以进入安全模式后,手动的进行 CheckPoint,让其生成新的 fsimage 文件。生成新的 fsimage 文件后,之前原来的 fsimage 会被删除【最多保存两个 fsimage】。

1
2
3
4
hadoop dfsadmin -safemode get  # 获取安全模式状态
hadoop dfsadmin -safemode enter # 进入安全模式
hadoop dfsadmin -saveNamespace # 手动进行 CheckPoint
hadoop dfsadmin -safemode leave # 离开安全模式

NN 中存储的信息

NN 的元数据分为两部分,一部分是 inodes 信息,该信息是记录在 fsimage 文件中或者 edits 文件中的。另一部分是 blocklist 块的位置信息,这部分信息不是记录在 fsimage 文件中或者 edits 文件中的,而是每次 DN 在启动后自动上报的。

停止所有的 datanode 和 namenode,然后单独启动 namenode,可以看到在安全模式中,namenode 在等待 datanode 上传 blocklist 块的位置信息。

1
2
3
hadoop-daemon.sh stop datanode  # 在 10,11,12 上停止 datanode 
hadoop-daemon.sh stop namenode # 在 10 上停止 datanode
hadoop-daemon.sh start namenode # 在 10 上启动 namenode

即下图中,红色部分的 inodes 信息是记录在 fsimage 文件中或者 edits 文件中的。蓝色部分的 blocklist 块位置信息是由 datanode 上传的。

查看 fsimage 内容

在 10 机器上执行以下命令,oiv 代表查看 fsimage 内容,-p 指定输出格式为 XML,-i 指定 fsimage,-o 是将结果文件的位置

1
hdfs oiv -p XML -i fsimage_0000000000000000916 -o ~/fsimage.xml

查看 fsimage.xml 文件,内容如下,可以看到只有一些 inodes 数据信息,但是没有块信息,因为块信息是各个 datanode 节点上传的。

查看 Edits 内容

在 10 机器上执行以下命令,oev 代表查看 edits 内容,-p 指定输出格式为 XML,-i 指定 edits,-o 是将结果文件的位置

1
hdfs oev -p XML -i edits_0000000000000000866-0000000000000000878  -o ~/edits.xml

安全模式

NN在启动时,当NN将所有的元数据加载完成后,等待DN来上报块的信息,当NN中所保存的所有块的最小副本数(默认为1) / 块的总数 > 99.99%时,NN会自动离开安全模式。在安全模式,客户端只能进行有限读操作【已经上报完成的块信息可以被读取】,但是不能进行写操作。

下图中每个 datanode 都有 10 个块

然后在 10 机器上上传3个文件,副本数设置为1,根据机架感知,这三个文件都会在 10 机器上,块信息如下:

然后停止 hdfs 服务,停止 namenode 和 datanode。再次重新启动 namenode,11 和 12 机器上的 datanode,会发现 hdfs 服务会一直处于安全模式,因为 11 或 12 机器上的所有块的最小副本数之和为 10(10个块,每个块至少一个副本)/ 13 不大于 99.99%,所以会处于安全模式。

还可以手动进入和离开安全模式

1
2
3
hadoop dfsadmin -safemode get  # 获取安全模式状态
hadoop dfsadmin -safemode enter # 进入安全模式
hadoop dfsadmin -safemode leave # 离开安全模式

元数据的恢复和备份

从 2NN 恢复元数据

停止 hdfs 服务,删除 10 机器 namenode 的 current 元数据目录

1
2
3
4
[rexyan@hadoop10 name]$ stop-dfs.sh
[rexyan@hadoop10 name]$ pwd
/opt/module/hadoop-2.7.2/data/tmp/dfs/name
[rexyan@hadoop10 name]$ rm -rf current/

将 12 机器上的 2NN 的数据拷贝到 10 上来

1
[rexyan@hadoop10 name]$ scp -r rexyan@hadoop12:/opt/module/hadoop-2.7.2/data/tmp/dfs/namesecondary/current ./

重新启动 namenode 即可。因为 2nn 并不是实时同步 nn 的数据的,所以可能会有部分元数据丢失。

1
hadoop-daemon.sh start namenode

如果出现元数据丢失,那么启动的时候 hdfs 服务会一直处于安全模式,我们可以使用 hadoop dfsadmin -safemode leave 离开安全模式,离开后删除坏的块信息就能解决问题。

备份元数据

为了防止 nn 存储数据的磁盘出现问题,在存储元数据时,可以配置多个存储元数据的目录,即可以将元数据存储在不同磁盘上。

修改 namenode 所在 10 机器的 hdfs-site.xml 文件,添加如下配置,tmp/dfs/data1tmp/dfs/data2 分别为两个备份目录

1
2
3
4
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///tmp/dfs/data1,file:///tmp/dfs/data2</value>
</property>

然后查看 /tmp/dfs/data1/tmp/dfs/data2 目录,就能看到备份的数据。

DN 工作机制

  1. 一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。
  2. DataNode 启动后向 NameNode 注册,通过后,周期性(1小时)的向 NameNode 上报所有的块信息。
  3. 心跳是每3秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个 DataNode 的心跳,则认为该节点不可用。
  4. 集群运行中可以安全加入和退出一些机器。

DN 不可用的场景

DN 和 NN 之间的心跳是 3 秒一次,超过 3s 后并不会认为该 DN 不可用,而是得等到 15 分钟后才将 DN 标记为死亡状态

DN 数据完整性

DN 在将数据块信息上报 NN 的时候,会进行数据完整性的校验。即当 DataNode 读取 Block 的时候,它会计算CheckSum值,如果计算后的 CheckSum,与 Block 创建时值不一样,说明 Block 已经损坏,Client 会读取其他DataNode 上的 Block。DataNode 还会在其文件创建后周期性验证 CheckSum。

服役新节点

克隆新机器,配置 IP 为 192.168.1.13,hostname 为 hadoop13。并安装 JDK,Hadoop

配置新机器的 hosts 映射,在其他机器添加本机的 hosts 映射

1
2
3
4
192.168.1.10 hadoop10
192.168.1.11 hadoop11
192.168.1.12 hadoop12
192.168.1.13 hadoop13

在 core-site.xml 配置 nn 的地址

1
2
3
4
5
6
7
8
9
10
11
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop10:9000</value>
</property>

<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>
</configuration>

在 yarn-site.xml 配置 rm 的地址

1
2
3
4
5
6
7
8
9
10
11
12
13
<configuration>
<!-- 获取数据的方式 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 指定YARN的ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop10</value>
</property>
</configuration>

启动新 DN 节点,然后就可以看到新节点已经加入了

退役旧节点

白名单隔离

只有在白名单里面的节点才能访问 namenode,新建 whitenamelist 文件,内容如下:会将上述加入集群的 hadoop13 隔离,不让其加入集群

1
2
3
hadoop10
hadoop11
hadoop12

在 hdfs-site.xml 中添加如下配置

1
2
3
4
<property>
<name>dfs.hosts</name>
<value>/home/rexyan/whitenamelist</value>
</property>

然后重启 namenode 或者使用下面命令重新读取 dfs.hosts 参数,注意,以下命令只针对配置黑白名单有用

1
hadoop dfsadmin -refreshNodes

配置完成之后就会发现 hadoop13 已经不在集群中了

黑名单退役

恢复 hadoop13 机器,让其重新加入集群。并且上传一个文件到 hadoop13 中,副本数指定为 1(也就是只让 hadoop13 中有这个文件)

1
hadoop fs -Ddfs.replication=1 -put hadoop-hdfs-2.7.2.jar /

然后在 hadoop10 上的 hdfs-site.xml 中配置黑名单,新建 blacknamelist 文件,内容如下:

1
hadoop13

hdfs-site.xml 中新增配置如下:

1
2
3
4
<property>
<name>dfs.hosts.exclude</name>
<value>/home/rexyan/blacknamelist</value>
</property>

执行下面命令刷新节点信息

1
hadoop dfsadmin -refreshNodes

查看状态,会发现 hadoop13 所在的节点处于 Decommissioned In Progress 状态,此状态就是将 hadoop13 的数据移交到其他节点上,因为我们之前上传了文件到 hdfs 中,且该文件是只属于 hadoop13 的。

等待一段时间后状态就会变成 Decommissioned,代表退役完成。

注意⚠️:如果配置了白名单,那么在配置黑名单的时候要保证在白名单中也有此节点。因为白名单是限制访问,如果白名单中没有此节点,那么不能访问也就不能完成退役。

DN 多目录配置

DataNode 也可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本。注意,这里和 namenode 不一样,不是备份!而是将数据写入新目录。哪个机器需要加添加多目录那么就在哪台机器上进行配置,例如在 hadoop12 上完成多目录的配置,那么就新建一个 new_data 目录,并且对 hdfs-site.xml 添加内容

1
2
3
4
[rexyan@hadoop12 hadoop-2.7.2]$ mkdir new_data # 新建目录,在真实场景中,该目录应该存在于新磁盘上
[rexyan@hadoop12 hadoop-2.7.2]$ pwd # 显示目录路径
/opt/module/hadoop-2.7.2
[rexyan@hadoop12 hadoop-2.7.2]$

编辑 hdfs-site.xml 添加以下内容:

1
2
3
4
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///${hadoop.tmp.dir}/dfs/data,file:///opt/module/hadoop-2.7.2/new_data</value>
</property>

重启 hadoop12 的datanode,然后上传两个文件。

1
2
[rexyan@hadoop12 ~]$ hadoop fs -put file3 /
[rexyan@hadoop12 ~]$ hadoop fs -put file4 /

file3 和 file4 两个文件在都存在 hadoop12 的节点上,且 file3 的块ID 为1073741862,fiel4 的块 ID 为1073741863

查看两个存储数据的目录,会发现一个目录存储了一个文件的块信息

解决HDFS上小文件的存储

  1. 从源头上解决,在上传时,将多个小文件归档,tar -zcvf xxx.tar.gz 小文件列表
  2. 如果小文件已经上传到HDFS了,可以使用在线归档。在线归档的功能实际是一个MR程序,这个程序将HDFS已经存在的多个小文件归档为一个归档文件!